package network

import (
	"context"
	"net"
	"qweewq.tk/flyff/pgk/common"
	"qweewq.tk/flyff/pgk/common/event"
	"sync"
	"sync/atomic"
)

type TcpServe struct {
	event.DefaultEventReactor
	IOHandle
	cb         ITcpHandle
	cxt        context.Context
	cancelFunc context.CancelFunc
	wg         sync.WaitGroup
	listener   net.Listener
	connPool   map[int32]connItem
	poolMxt    sync.Mutex
	evLoop     *event.EventLoop
}

func (self *TcpServe) OnCmdHandle(id int32, evCmd int64, extras []interface{}) {
	switch evCmd {
	case accept:
		go self.accept()
	case read:
		go self.IOHandle.rHandle(extras[0].(int32))
	case write:
		self.IOHandle.wHandle(extras[0].(int32), extras[1].([]byte))
	case close:
		self.close(extras[0].(int32))
	default:
		panic("system failed!")
	}
}

func (self *TcpServe) OnFinishEventLoop(id int32) {
	self.cb.OnFinish(id)
}

func (self *TcpServe) Launch() int32 {
	if self == nil {
		return -1
	}

	return self.evLoop.Launch()
}

func (self *TcpServe) Run() {
	self.evLoop.PostMsg(accept, nil)
}

func (self *TcpServe) CloseConn(connId int32) {
	self.evLoop.PostMsg(close, connId)
}

func (self *TcpServe) StopAndWait() {
	if self == nil {
		return
	}
	self.cancelFunc()
	self.listener.Close()
	self.evLoop.StopAndWait()
}

func (self *TcpServe) IsExistConnId(connId int32) bool {
	self.poolMxt.Lock()
	defer self.poolMxt.Unlock()

	_, ok := self.connPool[connId]
	return ok
}

func (self *TcpServe) SendData(connId int32, data []byte) {
	buf := make([]byte, len(data))
	l := copy(buf, data)

	if l != len(data) {
		panic("system fail.")
	}

	self.evLoop.PostMsg(write, connId, buf)
}

func (self *TcpServe) SyncSendData(connId int32, data []byte) bool {
	cItem, ok := self.tackOutConn(connId)

	if ok {
		dataLen := len(data)
		var sOffset = 0
		cItem.wMtx.Lock()
		defer cItem.wMtx.Unlock()

		for dataLen != sOffset {
			l, err := cItem.conn.Write(data[sOffset:])
			if err != nil {
				self.cb.OnError(err)
				return false
			}
			sOffset += l
		}
		return true
	}

	return false
}

func (self *TcpServe) rHandle(connId int32) {
	cItem, ok := self.tackOutConn(connId)

	if !ok {
		panic("system failed.")
	}

	var content = make([]byte, defaultBufSize)
exit:
	for {
			n, err := cItem.conn.Read(content[0:])
			if err != nil {
				self.cb.OnError(err)
				break exit
			}

			err = self.cb.OnReceived(connId, content[:n])
			if err != nil {
				self.cb.OnError(err)
				break exit
			}
	}

	self.evLoop.PostMsg(close, connId)
}

func (self *TcpServe) wHandle(connId int32, data []byte) {
	cItem, ok := self.tackOutConn(connId)
	if !ok {
		panic("system failed.")
	}

	dataLen := len(data)
	var sOffset = 0
	cItem.wMtx.Lock()
	defer cItem.wMtx.Unlock()
exit:
	for dataLen != sOffset {
		l, err := cItem.conn.Write(data[sOffset:])
		if err != nil {
			self.cb.OnError(err)
			self.evLoop.PostMsg(close, connId)
			break exit
		}

		sOffset += l
	}
}

func (self *TcpServe) tackOutConn(connId int32) (connItem, bool) {
	self.poolMxt.Lock()
	defer self.poolMxt.Unlock()

	item, ok := self.connPool[connId]
	return item, ok
}

func (self *TcpServe) verity() {
	if self == nil {
		return
	}
	switch self.listener.(type) {
	case *net.TCPListener:
		break
	default:
		panic("This type of connection is not currently supported.")
	}
}

func (self *TcpServe) addConnPool(connId int32, conn net.Conn) {
	self.poolMxt.Lock()
	defer self.poolMxt.Unlock()
	self.connPool[connId] = connItem{conn:conn}
}

func (self *TcpServe) removeConn(connId int32) {
	self.poolMxt.Lock()
	defer self.poolMxt.Unlock()
	cItem, ok := self.connPool[connId]
	if ok {
		delete(self.connPool, connId)
		cItem.conn.Close()
	}
}

func (self *TcpServe) accept() {
	self.wg.Add(1)
exit:
	for {
		select {
		case <-self.cxt.Done():
			break exit
		default:
			{
				conn, err := self.listener.Accept()
				if err != nil {
					self.cb.OnError(err)
					continue
				}

				var connId = atomic.AddInt32(&connNum, 1)
				for self.IsExistConnId(connId) {
					connId = atomic.AddInt32(&connNum, 1)
				}

				self.addConnPool(connId, conn)
				self.cb.OnAccept(connId, conn)
				self.evLoop.PostMsg(read, connId)
			}
		}
	}
	self.wg.Done()
}

func (self *TcpServe) close(connId int32) {
	self.removeConn(connId)
	self.cb.OnClosed(connId)
}

func NewTcpServe(cxt context.Context, listener net.Listener, cb ITcpHandle, options ...common.Option) *TcpServe {
	cancel, f := context.WithCancel(cxt)

	t := new(TcpServe)
	t.cb = cb
	t.cxt = cancel
	t.cancelFunc = f
	t.listener = listener
	t.connPool = make(map[int32]connItem)
	t.verity()
	t.IOHandle = t
	t.evLoop = event.NewEventLoop(cxt, t)


	for _, op := range options {
		switch op.(type) {
		case common.EvLoopQueOp:
			t.evLoop.InitCmdQueueSize(op.GetValue().(uint32))
		}
	}
	return t
}

